package com.ld.shieldsb.canalclient.etl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.druid.pool.DruidDataSource;
import com.google.common.base.Joiner;
import com.ld.shieldsb.canalclient.handler.config.AdapterConfig;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig.DbMapping;
import com.ld.shieldsb.canalclient.util.CanalUtil;
import com.ld.shieldsb.common.core.model.Result;
import com.ld.shieldsb.common.core.util.ResultUtil;
import com.ld.shieldsb.common.core.util.StringUtils;

public abstract class AbstractEtlService {

    protected Logger logger = LoggerFactory.getLogger(this.getClass());

    protected String type;
    protected MappingConfig config;
    protected final long CNT_PER_TASK = 10000L; // 每次最多传输的数据条数，超过此数目则会拆分
    protected volatile AtomicLong impCount; // 一次只进行一次传输
    protected long count = 0; // 数据总条数，线程不安全，注意单线程操作
    protected String uniqueKey; // 唯一主键，每次任务传输更改
    protected Long lastBeginTime; // 上次开始时间毫秒数，线程不安全，注意单线程操作
    protected Long lastEndTime; // 上次结束数据毫秒数，线程不安全，注意单线程操作
    protected volatile boolean finished; // 是否完成

    public AbstractEtlService(String type, MappingConfig config) {
        this.type = type;
        this.config = config;
    }

    /**
     * 导入数据
     */
    public Result importData(List<String> params, int importType, Consumer<EtlConsumer> con) {
        DbMapping dbMapping = config.getDbMapping();
        String sql = "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable();
        return importData(sql, params, importType, con);
    }

    protected Result importData(String sql, List<String> params, int importType, Consumer<EtlConsumer> con) {
        long start = System.currentTimeMillis(); // 开始时间
        long end = System.currentTimeMillis(); // 结束时间
        lastBeginTime = start;
        lastEndTime = end;
        count = 0;
        Result etlResult = new Result();
        impCount = new AtomicLong();
        finished = false; // 未完成

        if (params == null) {
            params = new ArrayList<>();
        }

        List<String> errMsg = new ArrayList<>();
        if (config == null) {
            logger.warn("{} mapping config 未设置, 数据etl结束 ", type);
            etlResult.setErrorMessage(type + "映射参数未设置，数据etl结束 ");
            errMsg.add("mapping config 未设置, 数据etl结束");
            finished = true;
            con.accept(EtlConsumer.builder().processState(EtlConsumer.PROCESS_STATE_END).sql(sql).mapping(config.getMapping())
                    .errMsg(errMsg).success(false).build());

            return etlResult;
        }

        List<Object> values = new ArrayList<>();
        // 开始回调
        try {
            if (StringUtils.isEmpty(config.getDataSourceKey())) {
                errMsg.add("数据源未设置！");
                finished = true;
                con.accept(EtlConsumer.builder().processState(EtlConsumer.PROCESS_STATE_END).sql(sql).values(values)
                        .mapping(config.getMapping()).errMsg(errMsg).success(false).build());

                return ResultUtil.error("数据源未设置！");
            }
            DruidDataSource srcDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());

            if (srcDS == null) {
                errMsg.add("数据源不存在！");
                finished = true;
                con.accept(EtlConsumer.builder().processState(EtlConsumer.PROCESS_STATE_END).sql(sql).values(values)
                        .mapping(config.getMapping()).errMsg(errMsg).success(false).build());

                return ResultUtil.error("数据源不存在！");
            }
            // 开始记录
            con.accept(EtlConsumer.builder().processState(EtlConsumer.PROCESS_STATE_BEGIN).srcDS(srcDS).sql(sql).values(values)
                    .mapping(config.getMapping()).errMsg(errMsg).success(true).build());

            // 拼接条件
            String etlCondition = config.getDbMapping().getEtlCondition();
            String etlDelta = config.getDbMapping().getEtlDelta();
            sql = EtlServiceUtil.dealEtlConditionSql(sql, params, etlCondition, etlDelta, importType, values);

            if (logger.isDebugEnabled()) {
                logger.debug("etl sql : {}", sql);
            }

            // 获取总数
            count = EtlServiceUtil.getCountBySql(sql, srcDS, values);
            // 调用回调函数
            con.accept(EtlConsumer.builder().processState(EtlConsumer.PROCESS_STATE_GETCOUNT).srcDS(srcDS).sql(sql).values(values)
                    .mapping(config.getMapping()).errMsg(errMsg).success(true).count(count).build());

            Thread.sleep(1000 * 60 * 5); // 测试

            dataImport(sql, impCount, errMsg, srcDS, values, count, con);

            end = System.currentTimeMillis();
            lastEndTime = end;
            logger.info("数据全量导入完成, 一共导入 {} 条数据, 耗时: {}", impCount.get(), end - start);
            etlResult.setSuccessMessage("导入" + type + " 数据：" + impCount.get() + " 条");
            etlResult.setData(impCount.get()); // 总数
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            errMsg.add(type + " 数据导入异常 =>" + e.getMessage());
        }
        if (errMsg.isEmpty()) {
            etlResult.setSuccess(true);
        } else {
            etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
        }
        // 设置为完成
        finished = true;
        // 结束回调,此方法一定会走
        con.accept(EtlConsumer.builder().processState(EtlConsumer.PROCESS_STATE_END).beginTime(start).endTime(end).count(count)
                .impCount(impCount).success(etlResult.getSuccess()).errMsg(errMsg).sql(sql).values(values).build());
        return etlResult;
    }

    /**
     * 数据导入
     * 
     * @Title dataImport
     * @author 吕凯
     * @date 2022年1月13日 下午3:28:55
     * @param sql
     * @param impCount
     * @param errMsg
     * @param srcDS
     * @param values
     * @param cnt
     * @throws InterruptedException
     * @throws ExecutionException
     *             void
     */
    private void dataImport(String sql, AtomicLong impCount, List<String> errMsg, DruidDataSource srcDS, List<Object> values, long cnt,
            Consumer<EtlConsumer> con) throws InterruptedException, ExecutionException {
        // 当大于1万条记录时开启多线程
        if (cnt >= 10000) {
            int threadCount = Runtime.getRuntime().availableProcessors();

            long offset;
            long size = CNT_PER_TASK;
            long workerCnt = cnt / size + (cnt % size == 0 ? 0 : 1);

            if (logger.isDebugEnabled()) {
                logger.debug("workerCnt {} for cnt {} threadCount {}", workerCnt, cnt, threadCount);
            }

            ExecutorService executor = CanalUtil.newFixedThreadPool(threadCount, 5000L);
            List<Future<Boolean>> futures = new ArrayList<>();
            for (long i = 0; i < workerCnt; i++) {
                offset = size * i;
                String sqlFinal = sql + " LIMIT " + offset + "," + size;
                Future<Boolean> future = executor
                        .submit(() -> executeSqlImport(srcDS, sqlFinal, values, cnt, config.getMapping(), impCount, errMsg, con));
                futures.add(future);
            }

            for (Future<Boolean> future : futures) {
                future.get();
            }
            executor.shutdown();
        } else {
            executeSqlImport(srcDS, sql, values, cnt, config.getMapping(), impCount, errMsg, con);
        }
    }

    /**
     * 导入数据
     */
    public Result getImportDataNum(String dataSourceKey, String database, String table, List<String> params, int importType) {
        String sql = "SELECT * FROM " + database + "." + table;
        return getImportDataNum(dataSourceKey, sql, params, importType);
    }

    /**
     * 获取导入数据的条数
     * 
     * @Title getImportDataNum
     * @author 吕凯
     * @date 2022年1月13日 下午3:25:56
     * @param sql
     * @param params
     * @param importType
     * @return Result
     */
    protected Result getImportDataNum(String dataSourceKey, String sql, List<String> params, int importType) {
        Result etlResult = new Result();
        List<String> errMsg = new ArrayList<>();

        try {
            if (StringUtils.isEmpty(dataSourceKey)) {
                return ResultUtil.error("数据源未设置！");
            }
            DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(dataSourceKey);

            if (dataSource == null) {
                return ResultUtil.error("数据源不存在！");
            }

            List<Object> values = new ArrayList<>();
            // 拼接条件
            String etlCondition = config.getDbMapping().getEtlCondition();
            String etlDelta = config.getDbMapping().getEtlDelta();
            sql = EtlServiceUtil.dealEtlConditionSql(sql, params, etlCondition, etlDelta, importType, values);

            if (logger.isDebugEnabled()) {
                logger.debug("etl sql : {}", sql);
            }

            // 获取总数
            long cnt = EtlServiceUtil.getCountBySql(sql, dataSource, values);
            etlResult.setData(cnt);

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            errMsg.add(type + " 数据导入异常 =>" + e.getMessage());
        }
        if (errMsg.isEmpty()) {
            etlResult.setSuccess(true);
        } else {
            etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
        }
        return etlResult;
    }

    public String getUniqueKey() {
        return uniqueKey;
    }

    public void setUniqueKey(String uniqueKey) {
        this.uniqueKey = uniqueKey;
    }

    public AtomicLong getImpCount() {
        return impCount;
    }

    public Long getCount() {
        return count;
    }

    public String getType() {
        return type;
    }

    public MappingConfig getConfig() {
        return config;
    }

    public boolean isFinished() {
        return finished;
    }

    public Long getLastBeginTime() {
        return lastBeginTime;
    }

    public Long getLastEndTime() {
        return lastEndTime;
    }

    protected abstract boolean executeSqlImport(DataSource srcDS, String sql, List<Object> values, long cnt,
            AdapterConfig.AdapterMapping mapping, AtomicLong impCount, List<String> errMsg, Consumer<EtlConsumer> con);

}
